Class Consumer

High-level Kafka Consumer, receives messages from a Kafka cluster.

Requires Kafka >= 0.9.0.0.

Inheritance
System.Object
Consumer
Namespace:RdKafka
Assembly:cs.temp.dll.dll
Syntax
public class Consumer : Handle, IDisposable

Constructors

Consumer(Config, String)

Declaration
public Consumer(Config config, string brokerList = null)
Parameters
Type Name Description
Config config
System.String brokerList

Properties

Assignment

Returns the current partition assignment as set by Assign.

Declaration
public List<TopicPartition> Assignment { get; }
Returns
Type Description
System.Collections.Generic.List<TopicPartition>

Subscription

Returns the current partition subscription as set by Subscribe.

Declaration
public List<string> Subscription { get; }
Returns
Type Description
System.Collections.Generic.List<System.String>

Methods

Assign(List<TopicPartitionOffset>)

Update the assignment set to \p partitions.

The assignment set is the set of partitions actually being consumed by the KafkaConsumer.

Declaration
public void Assign(List<TopicPartitionOffset> partitions)
Parameters
Type Name Description
System.Collections.Generic.List<TopicPartitionOffset> partitions

Commit()

Commit offsets for the current assignment.

Declaration
public Task Commit()
Returns
Type Description
Task

Commit(Message)

Commit offset for a single topic+partition based on message.

Declaration
public Task Commit(Message message)
Parameters
Type Name Description
Message message
Returns
Type Description
Task

Commit(List<TopicPartitionOffset>)

Commit explicit list of offsets.

Declaration
public Task Commit(List<TopicPartitionOffset> offsets)
Parameters
Type Name Description
System.Collections.Generic.List<TopicPartitionOffset> offsets
Returns
Type Description
Task

Consume(TimeSpan)

Manually consume message or get error, triggers events.

Will invoke events for OnPartitionsAssigned/Revoked, OnOffsetCommit, etc. on the calling thread.

Returns one of:

  • proper message (ErrorCode is NO_ERROR)
  • error event (ErrorCode is != NO_ERROR)
  • timeout due to no message or event within timeout (null)
Declaration
public MessageAndError? Consume(TimeSpan timeout)
Parameters
Type Name Description
System.TimeSpan timeout
Returns
Type Description
System.Nullable<MessageAndError>

Dispose()

Declaration
public override void Dispose()

Position(List<TopicPartition>, TimeSpan)

Retrieve committed positions (offsets) for topics+partitions.

Declaration
public Task<List<TopicPartitionOffset>> Position(List<TopicPartition> partitions, TimeSpan timeout)
Parameters
Type Name Description
System.Collections.Generic.List<TopicPartition> partitions
System.TimeSpan timeout
Returns
Type Description
Task<System.Collections.Generic.List<TopicPartitionOffset>>

Subscribe(List<String>)

Update the subscription set to topics.

Any previous subscription will be unassigned and unsubscribed first.

The subscription set denotes the desired topics to consume and this set is provided to the partition assignor (one of the elected group members) for all clients which then uses the configured partition.assignment.strategy to assign the subscription sets's topics's partitions to the consumers, depending on their subscription.

Declaration
public void Subscribe(List<string> topics)
Parameters
Type Name Description
System.Collections.Generic.List<System.String> topics

Unassign()

Stop consumption and remove the current assignment.

Declaration
public void Unassign()

Unsubscribe()

Unsubscribe from the current subscription set.

Declaration
public void Unsubscribe()

Events

OnOffsetCommit

Declaration
public event EventHandler<Consumer.OffsetCommitArgs> OnOffsetCommit

OnPartitionsAssigned

Declaration
public event EventHandler<List<TopicPartitionOffset>> OnPartitionsAssigned

OnPartitionsRevoked

Declaration
public event EventHandler<List<TopicPartitionOffset>> OnPartitionsRevoked